#' 提取工具变量
#'
#'
#' @param outcomes 只能是以下选项之一：
#'  1.单个openGWAS ID（例如"ieu-b-102"） 或多个openGWAS ID 例如（ c("ieu-b-102","ieu-b-103" ) ）;
#'  2.单个数据框，U1_Clean_data清洗过后的df;
#'  3.单个parquet文件路径（例如"F:/OneclickDatabase/Oneclick_GWAS_ID/Oneclick_a_0001.parquet"）
#'    或多个parquet文件路径（例如c("F:/OneclickDatabase/Oneclick_GWAS_ID/Oneclick_a_0001.parquet",
#'                                 "F:/OneclickDatabase/Oneclick_GWAS_ID/Oneclick_a_0002.parquet")）,
#'
#' @param p 是提取工具变量的阈值，默认的是5e-08，提取不到工具变量可以增大为1e-05，5e-06，1e-06等等
#' @param clump 默认为在线去除连锁不平衡，为"online", 本地去除为"local",推荐本地去除，可以降低Server code: 502的概率，
#'      但是需要提供plink_bin参数的plink软件和1000基因组的bfile模板文件
#'
#' @param r2 默认是0.001， 0.01等也可使用
#' @param kb 默认是10000， 1000,500等也可使用
#' @param max_retries 是重试计算器，默认是10，不能超过100，这个参数是对Server code: 502的一种解决办法
#' @param pop 默认是欧洲"EUR" (European), 其他的还有"SAS" (1000K South Asian), "EAS" (East Asian), "AFR" (African), "AMR" (American)
#' @param bfile 本地clump时需要的文件，里面应该有的文件是“EUR.bed，EUR.bim，EUR.fam 等等”，U1_set_file_path功能设置过路径后就不需要填了。
#'
#' @param plink_bin plink软件的位置，里面应该有的文件是“plink_Windows，plink_Darwin等”，U1_set_file_path功能设置过路径后就不需要填了。
#'
#'
#' @return 工具变量
#' @export
#'
#' @examples
#'
#' \dontrun{
#'
#' outcomes <- c("ieu-b-102","ieu-b-103" )
#'
#' # clump = "online"的时候无需提供bfile和plink_bin
#' ivs <- U2_extract_instruments(outcomes,
#'                              p = 5e-08,
#'                              clump = "online",
#'                              r2 = 0.001,
#'                              kb = 10000,
#'                              max_retries = 3,
#'                              pop = "EUR")
#'
#' # clump = "local" 使用时要提供bfile和plink_bin，U1_set_file_path功能设置过路径后就不需要填了
#'
#' ivs <- U2_extract_instruments(outcomes,
#'                              p = 5e-08,
#'                              clump = "local",
#'                              r2 = 0.001,
#'                              kb = 10000,
#'                              max_retries = 3,
#'                              pop = "EUR")
#'
#' # 这种情况可以合并使用，但是必须是list
#'  outcome<-c("ebi-a-GCST90020053",
#'   "finngen_R10_M13_DISCINFECTION.parquet") %>%
#'     split( list(.) )
#'
#' ivs <- U2_extract_instruments(outcome,
#'                              p = 5e-08,
#'                              clump = "local",
#'                              r2 = 0.001,
#'                              kb = 10000,
#'                              max_retries = 3,
#'                              pop = "EUR")
#'
#'
#'
#'
#'
#'
#' }
#'
#'
#'
U2_extract_instruments <- function(  outcomes,
                                     p = 5e-08,
                                     clump = "online",
                                     r2 = 0.001,
                                     kb = 10000,
                                     max_retries = 10,
                                     pop = "EUR",
                                     bfile = "",
                                     plink_bin = "" ){

  stopifnot(max_retries <= 100)
  stopifnot(clump %in% c("online","local"))
  stopifnot(pop %in% c("EUR", "SAS", "EAS", "AFR", "AMR"))

  if(!pop=="EUR"){ clump <- "local"
  message("如果不是EUR人群，就只能本地clump")
  }

  message("
##############################################################################
对应的论文部分（降重后使用）：
  SNP Inclusion: SNPS associated with at a p-value threshold of p < ",p," were included in this analysis.
  LD Clumping: For standard two sample MR it is important to ensure that the instruments for the exposure are independent. LD clumping was performed in PLINK using the ",pop," samples from the 1000 Genomes Project to estimate LD between SNPs, and, amongst SNPs that have an LD above a given threshold, retain only the SNP with the lowest p-value. A clumping window of ",kb,", r2 of ",r2," and significance level of 1 was used for the index and secondary SNPs.
##############################################################################"
          )

  outcomes<-give_class(outcomes)

  if(file.exists( paste0(file.path( tools::R_user_dir("Oneclick", which = "data") , "path.Rdata" )))){
    load(paste0(file.path( tools::R_user_dir("Oneclick", which = "data") , "path.Rdata" )))
  }else{
    stop("请用U1_set_file_path功能设置附加文件夹")
  }



  if(bfile==""){
    bfile <- file.path(path,"bfile")
  }

  if(plink_bin==""){
    os <- Sys.info()["sysname"]
    a <- paste0("plink/plink_", os)
    if (os == "Windows"){ a <- paste0(a, ".exe") }
    plink_bin <- file.path(path,a)
    if (!file.exists(plink_bin)) {
      stop("请下载相应系统的plink软件并放置在合适的位置！")
    }
  }

  IVs <- U2_extract_instrument( outcomes=outcomes,
                               p = p ,
                               clump = clump,
                               r2 = r2,
                               kb = kb,
                               max_retries = max_retries,
                               pop = pop,
                               bfile = bfile,
                               plink_bin = plink_bin  )

  if( ( "ncase.exposure" %in% colnames(IVs)) & ( "ncontrol.exposure" %in% colnames(IVs) ) ){
    IVs$effective_n.exposure<- ifelse( !is.na(IVs$ncase.exposure) & !is.na(IVs$ncontrol.exposure),
                                       TwoSampleMR::effective_n(IVs$ncase.exposure,IVs$ncontrol.exposure ),
                                       IVs$samplesize.exposure
    )
  }else{
    IVs$effective_n.exposure<- IVs$samplesize.exposure
  }

  if("eaf.exposure" %in% colnames(IVs) ){

    message("使用2*eaf*(1-eaf)*beta^2/(2*eaf*(1-eaf)*beta^2+ 2*eaf*(1-eaf)*effective_n*se^2  )的公式计算R2")

    IVs$R2<-with(IVs,
                 (2*eaf.exposure*(1-eaf.exposure)*beta.exposure*beta.exposure)/(2*eaf.exposure*(1-eaf.exposure)*beta.exposure*beta.exposure+2*eaf.exposure*(1-eaf.exposure)*effective_n.exposure*se.exposure*se.exposure)
    )
  }else if("maf.exposure" %in% colnames(IVs)){
    IVs$R2<-with(IVs,
                 (2*maf.exposure*(1-maf.exposure)*beta.exposure*beta.exposure)/(2*maf.exposure*(1-maf.exposure)*beta.exposure*beta.exposure+2*maf.exposure*(1-maf.exposure)*effective_n.exposure*se.exposure*se.exposure)
    )
  }


  message("使用beta^2/se^2公式计算单个F(singleF)值")

  IVs <- IVs %>%
    dplyr::mutate(singleF = beta.exposure^2/(se.exposure^2) )



  IVs$clump_p<-p
  IVs$clump_r2<-r2
  IVs$clump_kb<-kb

  return( IVs )

}


U2_extract_instrument <- function(outcomes,
                                  p = 5e-08,
                                  clump = "online",
                                  r2 = 0.001,
                                  kb = 10000,
                                  max_retries = 10,
                                  pop = "EUR",
                                  bfile = "",
                                  plink_bin = ""){
  UseMethod( "extract_instrument" )
}


extract_instrument.openGWAS <- function(outcomes,
                                       p = 5e-08,
                                       clump = "online",
                                       r2 = 0.001,
                                       kb = 10000,
                                       max_retries = 10,
                                       pop = "EUR",
                                       bfile = "",
                                       plink_bin = ""){


  suppressPackageStartupMessages(require(TwoSampleMR,quietly = TRUE))

      message("推断为openGWAS的ID")

      message("提取pval低于",p,"值的SNP中......")

     if(clump == "online"){

      pb <- progress::progress_bar$new(total = length(outcomes))

        IVs <- c()

      for (i in 1:length(outcomes)) {
        dat_IV <- retry( operation = TwoSampleMR::extract_instruments,
                                args=list(outcomes= outcomes[i],p1 = p, clump = TRUE) ,
                                max_retries = max_retries )



         IVs <- plyr::rbind.fill(IVs,dat_IV )

      if(is.null(IVs)){ warning( paste0("clump后找不到SNP，建议提高筛选的P值")  )

        return(IVs)

      }else{
        IVs <- IVs %>%
          TwoSampleMR::split_exposure()

      }


        pb$tick()
      }

     }else{

       dat_IVs <- c()

       pb <- progress::progress_bar$new(total = length(outcomes))
       for (i in 1:length(outcomes)) {
         dat_IV <- retry( operation = TwoSampleMR::extract_instruments,
                          args=list(outcomes= outcomes[i],p1 = p, clump = FALSE) ,
                          max_retries = max_retries )
         dat_IVs <- plyr::rbind.fill(dat_IVs,dat_IV )
         pb$tick()
       }

    if( is.null(dat_IVs) ){ warning( paste0("即使在不clump的情况下也找不到比 ",p," 这个p值更小的SNP，建议提高筛选的P值")  )
      return(dat_IVs)
      }

      message("本地去除连锁不平衡中......")
      IVs <- tryCatch( clump_data_local(dat_IVs, clump_kb = kb , clump_r2 = r2, pop=pop ,
                                                 bfile = bfile , plink_bin = plink_bin ) ,
                     error = function(e){NULL}  )

     }


     if(is.null(IVs)){ warning( paste0("clump后找不到SNP，建议提高筛选的P值")  )

        return(IVs)

      }else{
        IVs <- IVs %>%
          TwoSampleMR::split_exposure()
      }



    if( any( IVs$exposure == "" ) ){

      suppressMessages( ao<-U1_get_ao() )

      IVs <- IVs %>%
        dplyr::rowwise() %>%
        dplyr::mutate( exposure = ifelse( exposure == '', ao$trait[which(ao$id == id.exposure )],exposure    )   )

        }

   if( !is.null(IVs)  ){
   message("查询样本量中......")
    IVs_exp <- retry( operation = TwoSampleMR::add_metadata,
                      args=list(dat= IVs, cols = c("sample_size", "ncase", "ncontrol", "unit", "sd")) ,
                      max_retries = max_retries)

    if( any( grepl( 'finn', IVs_exp$id.exposure) ) ){
    IVs<- IVs_exp %>%
      dplyr::rowwise() %>%
      dplyr::mutate( samplesize.exposure = ifelse( grep( 'finn', id.exposure), ncase.exposure+ncontrol.exposure,samplesize.exposure    )   )
     }

  }
  return(IVs)
}



extract_instrument.df <- function(outcomes,
                                 p = 5e-08,
                                 clump = "online",
                                 r2 = 0.001,
                                 kb = 10000,
                                 max_retries = 10,
                                 pop = "EUR",
                                 bfile = "",
                                 plink_bin = ""){

    message("推断为数据框")
    suppressPackageStartupMessages(library(TwoSampleMR))
    dat_iv <- subset(outcomes,pval <= p)




    if( is.null(dat_iv) ){ warning( paste0("即使在不clump的情况下也找不到比 ",p," 这个p值更小的SNP，建议提高筛选的P值"))
      return(dat_iv)
    }


    suppressWarnings( suppressMessages(dat_iv <- tryCatch( format_data(dat_iv,type = "exposure"),
                                                            error = function(e){NULL} ) ))
    if( !is.null(dat_iv) ){

    if(clump=="online"){

      message("去除连锁不平衡中......")
      IVs <- retry(operation = clump_data_online,
                           args=list(dat=dat_iv,clump_kb = kb , clump_r2 = r2, pop=pop ),
                           max_retries = max_retries)


      }


    if(clump=="local"){


      message("去除连锁不平衡中......")

      IVs <-tryCatch( clump_data_local(dat_iv, clump_kb = kb , clump_r2 = r2, pop=pop ,
                                       bfile = bfile , plink_bin = plink_bin ) ,
                      error = function(e){NULL}  )


    }

      }else{
      IVs<-NULL
    }


    if(is.null(IVs)){ warning( paste0("clump后找不到SNP，建议提高筛选的P值")  )
         return(IVs) }else{


  return(IVs)
  }


}



extract_instrument.Oneclick <- function( outcomes,
                                        p = 5e-08,
                                        clump = "online",
                                        r2 = 0.001,
                                        kb = 10000,
                                        max_retries = 10,
                                        pop = "EUR",
                                        bfile = "",
                                        plink_bin = ""){


    require(TwoSampleMR)


    message("推断为parquet文件")
    # message("跑这个代码时，尽量不要做其他操作，避免内存管理失败")

    splitsize = 1

    sequence <- 1:length(outcomes)
    chunks <- split(sequence, ceiling(seq_along(sequence)/splitsize))

    d <- list()

    pb <- progress::progress_bar$new(total = length(chunks))

    for (i in 1:length(chunks)  ) {

      dat_IVs <- arrow::open_dataset(outcomes[chunks[[i]]]) %>%
      dplyr::filter( pval <= p) %>%
      dplyr::collect()

      class(dat_IVs)<-c("df",class(dat_IVs))

      suppressMessages(      IVs <- U2_extract_instrument( outcomes=dat_IVs,
                                  p = p ,
                                  clump = clump,
                                  r2 = r2,
                                  kb = kb,
                                  max_retries = max_retries,
                                  pop = pop,
                                  bfile = bfile,
                                  plink_bin = plink_bin ) )


      pb$tick()
      d[[i]] <- IVs
    }

    IVs <- plyr::rbind.fill(d)

    return(IVs)

}

extract_instrument.list<-function( outcomes,
                                   p = 5e-08,
                                   clump = "online",
                                   r2 = 0.001,
                                   kb = 10000,
                                   max_retries = 10,
                                   pop = "EUR",
                                   bfile = "",
                                   plink_bin = ""){

  IVs<-c()
  for (i in 1:length(outcomes)  ){
    d<- U2_extract_instrument( give_class(outcomes[[i]] ) ,
                              p = p ,
                              clump = clump,
                              r2 = r2,
                              kb = kb,
                              max_retries = max_retries,
                              pop = pop,
                              bfile = bfile,
                              plink_bin = plink_bin )
  IVs <- plyr::rbind.fill(IVs,d)
  }



  return(IVs)

}



retry <- function(operation, args, max_retries = 10) {

  for(try_num in 1:max_retries) {

    result <- tryCatch({
      do.call(operation, args)
    }, error = function(e) return("502") )

    if( !identical(result,"502")  ) {
      break
    }
    print(paste("第", try_num, "/",max_retries,"次尝试失败，休眠6到12秒再重试"))
    Sys.sleep( sample(6:12, 1) )

  }

  if( identical(result,"502") ) {
    stop("最大尝试后依然是失败")
  }

  return(result)

}

# 在线clump
clump_data_online <- function(dat,
                             clump_kb = 10000,
                             clump_r2 = 0.001,
                             clump_p = 0.99,
                             pop = "EUR")
{
  dat1 <- dplyr::tibble(
    rsid = dat$SNP,
    pval = dat$pval.exposure,
    id = dat$id.exposure)

  res <- ieugwasr:::api_query("ld/clump",
                              query = list(rsid = dat1[["rsid"]],
                                           pval = dat1[["pval"]],
                                           pthresh = clump_p,
                                           r2 = clump_r2,
                                           kb = clump_kb, pop = pop),
                              access_token = ieugwasr:::check_access_token()
  )

  stopifnot(!res[["status_code"]]=="502")

  res <- res  %>% ieugwasr:::get_query_content()

  y <- subset(dat1, !dat1[["rsid"]] %in% res)

  if (nrow(y) > 0) {
    message("Removing ", length(y[["rsid"]]), " of ", nrow(dat),
            " variants due to LD with other variants or absence from LD reference panel")
  }

  dat2 <- subset(dat, dat$SNP %in% res)

  return(dat2)
}



# 本地clump
clump_data_local <- function(dat,
                            clump_kb = 10000,
                            clump_r2 = 0.001,
                            clump_p = 0.99,
                            pop = "EUR",
                            bfile = "",
                            plink_bin = "")
{
  dat1 <- ieugwasr::ld_clump(
    clump_kb = clump_kb,
    clump_r2 = clump_r2,
    clump_p = clump_p,
    pop = pop,
    dplyr::tibble(
      rsid = dat$SNP,
      pval = dat$pval.exposure,
      id = dat$id.exposure
    ),
    plink_bin = plink_bin,
    bfile = file.path(bfile, pop)

  )
  dat2 <- subset(dat, SNP %in% dat1$rsid)
  return(dat2)
}

give_class<-function(outcomes){
  if(class(outcomes) %in% "list"   ){
    return(outcomes)
  }else{

  if( is.data.frame(outcomes) ){
    if( !any( class(outcomes) %in% "df" ) ){
      stop( "本地数据必须经过U1_Clean_data功能处理" )
    }
  }else{
    if( is.character(outcomes) & !all(file.exists(outcomes))  ){

      if(all(grepl("parquet", outcomes,fixed = TRUE))){
        class(outcomes)<- c("Oneclick" , class(outcomes) )
      }else if(  all(grepl("parquet", outcomes,fixed = TRUE) == FALSE ) ){
        class(outcomes)<- c("openGWAS" , class(outcomes) )

      }else{
        stop("openGWAS的ID不能与parquet文件组合在一起用，除非是list" )
      }

    }
    if( is.character(outcomes) & all(file.exists(outcomes))  ){ class(outcomes)<- c("Oneclick", class(outcomes) )  }
  }

  }




  return(outcomes)
}

